|
|
|
Insper - Jul/2020 - São Paulo, Brasil
Foram selecinados dados de usuários e estabelecimentos abertos em Toronto para treinar um modelo de classificação binária das notas das avaliações e assim prever se a nota de um estabelecimento que o usuário ainda não avaliou seria boa ou ruim. A recomendação é dada conforme a maior probabilidade de o usuário dar uma nota alta para aquele lugar.
Criar um sistema de recomendação de estabelecimentos bem avaliados em linha com o perfil do usuário.
Os dados das seguintes bases fornecidas pelp Yelp para estudos acadêmicos serão analisados
Check-ins - Histórico de check-ins por estabelecimento
A seguir estão as confirgurações para o ambiente Spark. Como o notebook foi compartilhado por GitHub e processado em máquinas diferentes, foram mantidas as diferentes configurações necessárias.
#import findspark as fs
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from wordcloud import WordCloud, ImageColorGenerator
import os
from pyspark.sql import SparkSession
# from pyspark.sql.types import *
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import col, count, explode, split, array, concat_ws
from pyspark.sql.types import ArrayType, IntegerType
import pandas as pd
import seaborn as sns
#from tqdm.notebook import tqdm
sns.set(style="ticks", palette="pastel")
%matplotlib inline
#Mac Local (Viviane)
#spark_location='/Users/vivi/server/spark' # Set your own
#java8_location= '/Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home/' # Set your own
#os.environ['JAVA_HOME'] = java8_location
#fs.init(spark_home=spark_location)
#datapath = 'C:\\Users\\RuWindows\\Desktop\\PI\\yelp_dataset\\' #Marcelo
datapath = '../data/yelp' #servidor AWS
#datapath = 'data' #Viviane
#files = sorted(os.listdir(datapath))
#files
#!head data/yelp_academic_dataset_review.json
# Spark Session
spark = SparkSession.builder \
.master('local[*]') \
.appName('Integradora Yelp') \
.config("spark.ui.port", "4060") \
.getOrCreate()
#spark = SparkSession.builder \
# .master('local[8]') \
# .appName('Yelp Integradora') \
# .getOrCreate()
sc = spark.sparkContext
spark#.stop()
usr_raw = spark.read.json(datapath+'/yelp_academic_dataset_user.json')
rv_raw = spark.read.json(datapath+'/yelp_academic_dataset_review.json')
bz_raw = spark.read.json(datapath+'/yelp_academic_dataset_business.json')
tp_raw = spark.read.json(datapath+'/yelp_academic_dataset_tip.json')
ch_raw = spark.read.json(datapath+'/yelp_academic_dataset_checkin.json')
# Visualizando Estrutura
#bz_raw.printSchema()
ch = ch_raw.withColumn('dt', f.explode(f.split(f.col('date'),', ')))
ch_raw.show()
ch.createOrReplaceTempView('ch')
ch1 = spark.sql('''
SELECT business_id, dt,
DATE(dt) AS date,
DAYOFWEEK(dt) AS dow,
MONTH(dt) AS month,
YEAR(dt) AS year,
HOUR(dt) AS hour,
DAY(dt) AS day
FROM ch
''')
ch1.show()
ch1.createOrReplaceTempView('ch1')
ch2 = spark.sql('''
SELECT business_id, year, month, dow, day,
COUNT(date) AS count
FROM ch1
GROUP BY business_id, year, month, dow, day
ORDER BY count DESC
''')
ch2.createOrReplaceTempView('ch2')
df = spark.sql('''
SELECT *
FROM ch2
WHERE business_id = 'BxCzy1WOVxOrr_G7V4BIvg'
''').toPandas()
df2 = df.groupby(['month','dow'])['count'].sum().unstack('dow').fillna(0)
plt.figure(figsize = (25,7))
sns.heatmap(data = df2, cmap="YlGnBu", annot = True)
Pelo gráfico, verifica-se que o local é mais movimentado entre janeiro e março, principalmente aos finais de semana (dias 7 e 1). Para a modelagem, o ideal seria utilizar a moda (valor mais frequente) ou mediana dos check-ins, mas não é possível calcular com processento paralelo. A média pode não ser representativa, pois o pico de movimento está concentrado nos finais de semana.
tp_raw.createOrReplaceTempView('tp')
tp_raw.show()
tp_usr = spark.sql('''
SELECT user_id,
count(text) AS tips_counter,
sum(compliment_count) as total_compliments
FROM tp
GROUP BY user_id
ORDER BY total_compliments DESC
''')
tp_usr.show()
tp_usr.createOrReplaceTempView('tp_usr')
tp_bz = spark.sql('''
SELECT business_id,
count(text) AS tips_counter,
sum(compliment_count) as total_compliments
FROM tp
GROUP BY business_id
ORDER BY total_compliments DESC
''')
tp_bz.createOrReplaceTempView('tp_bz')
tp_bz.show()
A quantidade total e o alcance das dicas por estabelecimento serão utilizadas na modelagem.
dfs = []
for x in ["hours", "attributes"]:
cols = bz_raw.select(f"{x}.*").columns
for col in cols:
try:
dfs.append(dfs[-1].withColumn(col, f.col(f"{x}.{col}")))
except IndexError:
dfs.append(bz_raw.withColumn(col, f.col(f"{x}.{col}")))
bz = dfs[-1].drop("hours", "attributes")
bz.createOrReplaceTempView("bz")
bz.printSchema()
bz1 = spark.sql('''
SELECT *
FROM bz
WHERE city == 'Toronto'
AND is_open == 1
''')
bz1.groupBy('is_open').count().show()
cols = bz_raw.select('attributes.*').columns
Valores únicos em cada coluna de atributos. As colunas que contém listas serão desconsideradas da base para facilitar a análise. As demais serão codificadas conforme a seguir:
Atributos com True/False como resposta:
Atributos que possuem uma descrição das características foram substituídas por números como o exemplo:
[bz1.groupBy(cols[x]).count().show() for x in range(len(cols))]
unwanted). unwanted = ['Ambience','BestNights','BusinessAcceptsBitcoin','BusinessParking','BYOBCorkage','DietaryRestrictions','GoodForMeal','HairSpecializesIn','Music','Open24Hours','RestaurantsCounterService']
bz2 = bz1.drop('Ambience','BestNights','BusinessAcceptsBitcoin','BusinessParking','BYOBCorkage','DietaryRestrictions','GoodForMeal','HairSpecializesIn','Music','Open24Hours','RestaurantsCounterService',
'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday','address','postal_code','state','city','is_open')
for col in unwanted:
cols.remove(col)
bz3 = bz2.fillna('0')
Codificação dos atributos conforme descrito anteriormente. O processamento com REGEX foi mais eficiente do que a função .replace do PySpark.
bz4 = bz3.replace({'True':'2', 'False':'1','None':'0','null':'0'}, cols)
bz5 = bz4.withColumn('AgesAllowed',f.regexp_replace(f.col('AgesAllowed'), "u'allages'",'0'))
bz6 = bz5.withColumn('AgesAllowed',f.regexp_replace(f.col('AgesAllowed'), "u'19plus'",'19'))
bz7 = bz6.withColumn('Alcohol',f.regexp_replace(f.col('Alcohol'), "'none'|u'none'",'0'))
bz8 = bz7.withColumn('Alcohol',f.regexp_replace(f.col('Alcohol'), "'beer_and_wine'|u'beer_and_wine'",'1'))
bz9 = bz8.withColumn('Alcohol',f.regexp_replace(f.col('Alcohol'), "'full_bar'|u'full_bar'",'2'))
bz10 = bz9.withColumn('NoiseLevel',f.regexp_replace(f.col('NoiseLevel'), "'quiet'|u'quiet'",'1'))
bz11 = bz10.withColumn('NoiseLevel',f.regexp_replace(f.col('NoiseLevel'), "'average'|u'average'",'2'))
bz12 = bz11.withColumn('NoiseLevel',f.regexp_replace(f.col('NoiseLevel'), "'very_loud'|u'very_loud'",'4'))
bz13 = bz12.withColumn('NoiseLevel',f.regexp_replace(f.col('NoiseLevel'), "'loud'|u'loud'",'3'))
bz14 = bz13.withColumn('Smoking',f.regexp_replace(f.col('Smoking'), "'no'|u'no'",'1'))
bz15 = bz14.withColumn('Smoking',f.regexp_replace(f.col('Smoking'), "'outdoor'|u'outdoor'",'2'))
bz16 = bz15.withColumn('Smoking',f.regexp_replace(f.col('Smoking'), "'yes'|u'yes'",'3'))
bz17 = bz16.withColumn('WiFi',f.regexp_replace(f.col('WiFi'), "'no'|u'no'",'1'))
bz18 = bz17.withColumn('WiFi',f.regexp_replace(f.col('WiFi'), "'paid'|u'paid'",'2'))
bz19 = bz18.withColumn('WiFi',f.regexp_replace(f.col('WiFi'), "'free'|u'free'",'3'))
bz20 = bz19.withColumn('RestaurantsAttire',f.regexp_replace(f.col('WiFi'), "'casual'|u'casual'",'1'))
bz21 = bz20.withColumn('RestaurantsAttire',f.regexp_replace(f.col('WiFi'), "'dressy'|u'dressy'",'2'))
bz22 = bz21.withColumn('RestaurantsAttire',f.regexp_replace(f.col('WiFi'), "'formal'|u'formal'",'3'))
#[bz22.groupBy(cols[x]).count().show() for x in range(len(cols))]
bz22.createOrReplaceTempView('bz22')
bz23 = spark.sql('''
SELECT A.*,
IFNULL(B.tips_counter,0) AS tips_counter_bz,
IFNULL(B.total_compliments,0) AS total_compliments_bz
FROM bz22 as A
LEFT JOIN tp_bz as B
ON A.business_id = B.business_id
''')
bz23.columns
bz23.write \
.format('csv') \
.mode('overwrite') \
.option('sep', ',') \
.option('header', True) \
.save('output/yelp_bz.csv')
bz23.createOrReplaceTempView('bz')
rv_raw.createOrReplaceTempView('rv')
# Transforma variáveis para serem utilizadas na Matriz de Distâncias - H Clust
# Arrays em Numéricas - "id" de friends para friends_count
# Arrays em Numéricas - "ano" de elite para elite_count
# Data em Numéricas do ano - "yelping_since" para "year_since"
from pyspark.sql.functions import col, count, explode, split, array
from pyspark.sql.types import ArrayType, IntegerType
usr_raw1 = usr_raw.withColumn('friends_count', f.size(f.split(f.col('friends'), ' ')))
usr_raw1 = usr_raw1.withColumn('elite_count', f.size(f.split(f.col('elite'), ',')))
usr_raw1 = usr_raw1.withColumn('year_since', f.year('yelping_since'))
#usr_raw1.show()
usr_raw1.createOrReplaceTempView('usr_raw1')
usr_base = spark.sql('''
SELECT
A.user_id,
A.average_stars,
A.compliment_cool,
A.compliment_cute,
A.compliment_funny,
A.compliment_hot,
A.compliment_list,
A.compliment_more,
A.compliment_note,
A.compliment_photos,
A.compliment_plain,
A.compliment_profile,
A.compliment_writer,
A.cool,
A.elite_count,
A.fans,
A.friends_count,
A.funny,
A.review_count as review_count_usr,
A.useful,
A.year_since,
B.business_id
FROM usr_raw1 as A LEFT JOIN rv as B
ON A.user_id = B.user_id
''')
usr_base.createOrReplaceTempView('usr_base')
spark.sql('''
SELECT
COUNT(DISTINCT user_id)
FROM usr_base
''').show()
usr_base2 = spark.sql('''
SELECT
A.business_id,
B.*
FROM bz as A
LEFT JOIN usr_base as B
ON A.business_id = B.business_id
''')
usr_base2.createOrReplaceTempView('usr_base2')
spark.sql('''
SELECT
COUNT(DISTINCT user_id)
FROM usr_base2
''').show()
# Identificar User_id Apenas de Usuários que fizeram Reviews em Estabelecimentos Abertos Em Toronto
usr_base3 = spark.sql('''
SELECT
DISTINCT user_id
FROM usr_base2
''')
usr_base3.createOrReplaceTempView('usr_base3')
# Seleciona Apenas de Usuários que fizeram Reviews em Estabelecimentos Abertos Em Toronto
usr_base4 = spark.sql('''
SELECT
A.user_id,
A.average_stars,
A.compliment_cool,
A.compliment_cute,
A.compliment_funny,
A.compliment_hot,
A.compliment_list,
A.compliment_more,
A.compliment_note,
A.compliment_photos,
A.compliment_plain,
A.compliment_profile,
A.compliment_writer,
A.cool,
A.elite_count,
A.fans,
A.friends_count,
A.funny,
A.review_count AS review_count_usr,
A.useful,
A.year_since
FROM usr_raw1 as A LEFT JOIN usr_base3 as B
ON A.user_id = B.user_id
WHERE B.user_id <> ''
''')
usr_base4.createOrReplaceTempView('usr_base4')
spark.sql('''
SELECT
COUNT(DISTINCT user_id)
FROM usr_raw1
''').show()
spark.sql('''
SELECT
COUNT(DISTINCT user_id)
FROM usr_base4
''').show()
usr = spark.sql('''
SELECT A.*,
IFNULL(B.tips_counter,0) AS tips_counter,
IFNULL(B.total_compliments,0) AS total_compliments
FROM usr_base4 as A
LEFT JOIN tp_usr as B
ON A.user_id = B.user_id
''')
usr.count()
usr.write \
.format('csv') \
.mode('overwrite') \
.option('sep', ',') \
.option('header', True) \
.save('output/yelp_usr.csv')
usr.createOrReplaceTempView('usr')
usr.columns
Consolidação dass bases de USUÁRIO + BUSINESS + REVIEWS (apenas a nota que será a variável resposta utilizada na classificação)
Após consolidação da base em Python, será feita a leitura da base e substituição da variável resposta para binária:
- Notas maiores ou iguais a 4 - boas (1)
- Notas menores do que 4 - ruim (0)
Além disso, serão mantidas apenas as variáveis numéricas para treinamento da rede neural.
usr_clusters = spark.read.csv('output/usr_cluster.csv', header = True)
usr_clusters.createOrReplaceTempView('usr_clusters')
#Juntando número do cluster na base de usuários
usr_clusters2 = spark.sql("""
SELECT A.*,
B.cluster_usr
FROM usr as A
LEFT JOIN usr_clusters as B
ON A.user_id = B.user_id
""")
usr_clusters2.createOrReplaceTempView('usr_clusters2')
#seleção das reviews daqueles usuários
bf1 = spark.sql("""
SELECT
B.*,
A.business_id,
A.stars as stars_rv,
YEAR(A.date) AS year_rv
FROM usr_clusters2 as B
LEFT JOIN rv as A
ON B.user_id = A.user_id
""")
bf1.columns
bf1.createOrReplaceTempView('bf1')
rv_df = spark.sql("""
SELECT stars_rv, year_rv, count(stars_rv) as qtde_rv
FROM bf1 as A
GROUP BY stars_rv, year_rv
""").toPandas()
rv_df.groupby(['year_rv','stars_rv'])['qtde_rv'].agg('sum').unstack('stars_rv').plot(kind = 'bar', stacked = True, figsize = (25,6))
bf2 = spark.sql('''
SELECT A.*,
B.categories,
B.latitude,
B.longitude,
B.name,
B.review_count,
B.stars,
B.AcceptsInsurance,
B.AgesAllowed,
B.Alcohol,
B.BYOB,
B.BikeParking,
B.BusinessAcceptsCreditCards,
B.ByAppointmentOnly,
B.Caters,
B.CoatCheck,
B.Corkage,
B.DogsAllowed,
B.DriveThru,
B.GoodForDancing,
B.GoodForKids,
B.HappyHour,
B.HasTV,
B.NoiseLevel,
B.OutdoorSeating,
B.RestaurantsAttire,
B.RestaurantsDelivery,
B.RestaurantsGoodForGroups,
B.RestaurantsPriceRange2,
B.RestaurantsReservations,
B.RestaurantsTableService,
B.RestaurantsTakeOut,
B.Smoking,
B.WheelchairAccessible,
B.WiFi,
B.tips_counter_bz,
B.total_compliments_bz
FROM bf1 as A
RIGHT JOIN bz as B
ON A.business_id = B.business_id
WHERE year_rv >= 2017
''')
bf2.createOrReplaceTempView('bf2')
spark.sql('''
SELECT
COUNT(DISTINCT business_id)
FROM bf2
''').show()
bf2.columns
bf2.write \
.format('csv') \
.mode('overwrite') \
.option('sep', ',') \
.option('header', True) \
.save('output/yelp.csv')
#Função para remoção de pontuação e tratamento das palavras
def word_clean(sdf,col,new_col):
rv1 = sdf.withColumn(new_col,f.regexp_replace(f.col(col), "'d", " would"))
rv2 = rv1.withColumn(new_col,f.regexp_replace(f.col(new_col), "'ve", " have"))
rv3 = rv2.withColumn(new_col,f.regexp_replace(f.col(new_col), "'s", " is"))
rv4 = rv3.withColumn(new_col,f.regexp_replace(f.col(new_col), "'re", " are"))
rv5 = rv4.withColumn(new_col,f.regexp_replace(f.col(new_col), "n't", " not"))
rv6 = rv5.withColumn(new_col,f.regexp_replace(f.col(new_col), '\W+', " "))
rv7 = rv6.withColumn(new_col,f.lower(f.col(new_col)))
return rv7
rv_raw.columns
usr_clusters2.createOrReplaceTempView('usr_clusters2')
rv_raw.createOrReplaceTempView('rv_raw')
spark.sql('''
SELECT
count(*)
FROM rv_raw
''').show()
words = spark.sql('''
SELECT
A.review_id,
A.business_id,
A.user_id,
A.text,
max(B.text) as text_tp
FROM rv_raw as A
LEFT JOIN tp as B
ON A.user_id = B.user_id and A.business_id = B.business_id
GROUP BY 1, 2, 3, 4
''')
words.createOrReplaceTempView('words')
spark.sql('''
SELECT
count(*)
FROM words
''').show()
bz.columns
words2 = spark.sql('''
SELECT A.*
FROM bz22 as B
LEFT JOIN words as A
ON A.business_id = B.business_id
''')
words2.count()
words2.createOrReplaceTempView('words2')
words3 = spark.sql('''
SELECT A.*,
B.cluster_usr
FROM words2 as A
LEFT JOIN usr_clusters2 as B
ON A.user_id = B.user_id
''')
words3.count()
words4 = word_clean(words3,'text','text_clean')
words5 = word_clean(words4,'text_tp','text_clean_tp')
words5.show(4)
from pyspark.sql.functions import split, regexp_replace, col, split, stddev, regexp_replace
words6 = words5.select("cluster_usr", split(
"text_clean", r"[\s.]+").alias("text2"), split("text_clean_tp", r"[\s.]+").alias("text_tp2"))
STOPWORDS REMOVER para remoção de "palavras de parada" que não contribuirão na análise das palavras por não transmitirem significados para os clusters. #Configuração de idioma do Java que afeta o pacote StopWords
locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))
remover = StopWordsRemover(inputCol="text2", outputCol="text_filtered")
words7 = remover.transform(words6)
words7.show(5)
from pyspark.sql.functions import concat_ws
words8 = words7.select("cluster_usr", "text_filtered",
concat_ws(' ', "text_filtered").alias("text_filtered2"))
words9 = words8.withColumn('wordCount', f.size(f.split(f.col('text_filtered2'),' ')))
words10 = words9.withColumn('word', f.explode(f.split(f.col('text_filtered2'), ' ')))\
.groupBy('word','cluster_usr')\
.count()\
.sort('count', ascending = False)
words10.show()
words10.createOrReplaceTempView('words10')
cls = []
#for i in tqdm(range(1, 11)): # Biblioteca que apresenta a evolução do tempo de processamento, mas não está instalada no servidor AWS
for i in range(1, 11):
cls.append(spark.sql(f'''
SELECT word,
count
FROM words10
WHERE word IS NOT NULL
AND cluster_usr == {i}
ORDER BY count DESC limit 50
'''))
cls[i-1] = [tuple(x) for x in cls[i-1].toPandas().to_numpy()]
cluster = 1
for cl in cls:
print(f"Cluster {cluster}")
plt.figure(figsize=(10, 10))
plt.imshow(WordCloud().generate(str(cl).replace("'", "")), interpolation='bilinear')
plt.axis('off')
plt.show()
cluster += 1
base_mapas = spark.sql('''
SELECT A.*, B.latitude, B.longitude
FROM bz22 as B
LEFT JOIN words as A
ON A.business_id = B.business_id
''')
base_mapas.columns
base_mapas.createOrReplaceTempView('base_mapas')
mapa1 = spark.sql("""
SELECT latitude,
longitude
FROM base_mapas
WHERE latitude is not null
AND longitude is not null
""")
Decobrindo o ponto central de Latitude e Longetude do Mapa
spark.sql("""
SELECT avg(latitude) as avg_lat,
avg(longitude) as avg_long
FROM base_mapas
""").show()
import folium
from folium import plugins
mapa = folium.Map(location=[43.673, -79.391],
zoom_start=11,
tiles='Stamen Toner')
# OpenStreetMap, Stamen Terrain, Stamen Toner
mapa
lat = mapa1.toPandas()['latitude'].values
lon = mapa1.toPandas()['longitude'].values
coordenadas = []
for la, lo in zip(lat, lon):
coordenadas.append([la,lo])
mapa.add_child(plugins.HeatMap(coordenadas))